#!/usr/bin/env python
#
# Copyright 2011, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


"""Test for end-to-end."""


import logging
import os
import signal
import socket
import subprocess
import sys
import time
import unittest

import set_sys_path  # Update sys.path to locate mod_pywebsocket module.

from test import client_for_testing


# Special message that tells the echo server to start closing handshake
_GOODBYE_MESSAGE = 'Goodbye'

# If you want to use external server to run end to end tests, set following
# parameters correctly.
_use_external_server = False
_external_server_port = 0


# Test body functions
def _echo_check_procedure(client):
    client.connect()

    client.send_message('test')
    client.assert_receive('test')

    client.send_close()
    client.assert_receive_close()

    client.assert_connection_closed()


def _echo_check_procedure_with_binary(client):
    client.connect()

    client.send_message('binary', binary=True)
    client.assert_receive('binary', binary=True)
    client.send_message('\x00\x80\xfe\xff\x00\x80', binary=True)
    client.assert_receive('\x00\x80\xfe\xff\x00\x80', binary=True)

    client.send_close()
    client.assert_receive_close()

    client.assert_connection_closed()


def _echo_check_procedure_with_goodbye(client):
    client.connect()

    client.send_message('test')
    client.assert_receive('test')

    client.send_message(_GOODBYE_MESSAGE)
    client.assert_receive(_GOODBYE_MESSAGE)

    client.assert_receive_close()
    client.send_close()

    client.assert_connection_closed()


def _echo_check_procedure_with_code_and_reason(client, code, reason):
    client.connect()

    client.send_close(code, reason)
    client.assert_receive_close(code, reason)

    client.assert_connection_closed()


def _unmasked_frame_check_procedure(client):
    client.connect()

    client.send_message('test', mask=False)
    client.assert_receive_close(client_for_testing.STATUS_PROTOCOL_ERROR, '')

    client.assert_connection_closed()


class EndToEndTest(unittest.TestCase):
    """An end-to-end test that launches pywebsocket standalone server as a
    separate process, connects to it using the client_for_testing module, and
    checks if the server behaves correctly by exchanging opening handshake and
    frames over a TCP connection.
    """

    def setUp(self):
        self.server_stderr = None
        self.top_dir = os.path.join(os.path.split(__file__)[0], '..')
        os.putenv('PYTHONPATH', os.path.pathsep.join(sys.path))
        self.standalone_command = os.path.join(
            self.top_dir, 'mod_pywebsocket', 'standalone.py')
        self.document_root = os.path.join(self.top_dir, 'example')
        s = socket.socket()
        s.bind(('localhost', 0))
        (_, self.test_port) = s.getsockname()
        s.close()

        self._options = client_for_testing.ClientOptions()
        self._options.server_host = 'localhost'
        self._options.origin = 'http://localhost'
        self._options.resource = '/echo'

        # TODO(toyoshim): Eliminate launching a standalone server on using
        # external server.

        if _use_external_server:
            self._options.server_port = _external_server_port
        else:
            self._options.server_port = self.test_port

    def _run_python_command(self, commandline, stdout=None, stderr=None):
        return subprocess.Popen([sys.executable] + commandline, close_fds=True,
                                stdout=stdout, stderr=stderr)

    def _run_server(self, allow_draft75=False):
        args = [self.standalone_command,
                '-H', 'localhost',
                '-V', 'localhost',
                '-p', str(self.test_port),
                '-P', str(self.test_port),
                '-d', self.document_root]

        # Inherit the level set to the root logger by test runner.
        root_logger = logging.getLogger()
        log_level = root_logger.getEffectiveLevel()
        if log_level != logging.NOTSET:
            args.append('--log-level')
            args.append(logging.getLevelName(log_level).lower())

        if allow_draft75:
            args.append('--allow-draft75')

        return self._run_python_command(args,
                                        stderr=self.server_stderr)

    def _kill_process(self, pid):
        if sys.platform in ('win32', 'cygwin'):
            subprocess.call(
                ('taskkill.exe', '/f', '/pid', str(pid)), close_fds=True)
        else:
            os.kill(pid, signal.SIGKILL)

    def _run_hybi_test_with_client_options(self, test_function, options):
        server = self._run_server()
        try:
            # TODO(tyoshino): add some logic to poll the server until it
            # becomes ready
            time.sleep(0.2)

            client = client_for_testing.create_client(options)
            try:
                test_function(client)
            finally:
                client.close_socket()
        finally:
            self._kill_process(server.pid)

    def _run_hybi_test(self, test_function):
        self._run_hybi_test_with_client_options(test_function, self._options)

    def _run_hybi_deflate_test(self, test_function):
        server = self._run_server()
        try:
            time.sleep(0.2)

            self._options.use_deflate_stream = True
            client = client_for_testing.create_client(self._options)
            try:
                test_function(client)
            finally:
                client.close_socket()
        finally:
            self._kill_process(server.pid)

    def _run_hybi_deflate_frame_test(self, test_function):
        server = self._run_server()
        try:
            time.sleep(0.2)

            self._options.use_deflate_frame = True
            client = client_for_testing.create_client(self._options)
            try:
                test_function(client)
            finally:
                client.close_socket()
        finally:
            self._kill_process(server.pid)

    def _run_hybi_close_with_code_and_reason_test(self, test_function, code,
                                                  reason):
        server = self._run_server()
        try:
            time.sleep(0.2)

            client = client_for_testing.create_client(self._options)
            try:
                test_function(client, code, reason)
            finally:
                client.close_socket()
        finally:
            self._kill_process(server.pid)

    def _run_hybi_http_fallback_test(self, options, status):
        server = self._run_server()
        try:
            time.sleep(0.2)

            client = client_for_testing.create_client(options)
            try:
                client.connect()
                self.fail('Could not catch HttpStatusException')
            except client_for_testing.HttpStatusException, e:
                self.assertEqual(status, e.status)
            except Exception, e:
                self.fail('Catch unexpected exception')
            finally:
                client.close_socket()
        finally:
            self._kill_process(server.pid)

    def test_echo(self):
        self._run_hybi_test(_echo_check_procedure)

    def test_echo_binary(self):
        self._run_hybi_test(_echo_check_procedure_with_binary)

    def test_echo_server_close(self):
        self._run_hybi_test(_echo_check_procedure_with_goodbye)

    def test_unmasked_frame(self):
        self._run_hybi_test(_unmasked_frame_check_procedure)

    def test_echo_deflate(self):
        self._run_hybi_deflate_test(_echo_check_procedure)

    def test_echo_deflate_server_close(self):
        self._run_hybi_deflate_test(_echo_check_procedure_with_goodbye)

    def test_echo_deflate_frame(self):
        self._run_hybi_deflate_frame_test(_echo_check_procedure)

    def test_echo_deflate_frame_server_close(self):
        self._run_hybi_deflate_frame_test(
            _echo_check_procedure_with_goodbye)

    def test_echo_close_with_code_and_reason(self):
        self._options.resource = '/close'
        self._run_hybi_close_with_code_and_reason_test(
            _echo_check_procedure_with_code_and_reason, 3333, "sunsunsunsun")

    def test_close_on_protocol_error(self):
        """Tests that the server sends a close frame with protocol error status
        code when the client sends data with some protocol error.
        """

        def test_function(client):
            client.connect()

            # Intermediate frame without any preceding start of fragmentation
            # frame.
            client.send_frame_of_arbitrary_bytes('\x80\x80', '')
            client.assert_receive_close(
                client_for_testing.STATUS_PROTOCOL_ERROR)

        self._run_hybi_test(test_function)

    def test_close_on_unsupported_frame(self):
        """Tests that the server sends a close frame with unsupported operation
        status code when the client sends data asking some operation that is
        not supported by the server.
        """

        def test_function(client):
            client.connect()

            # Text frame with RSV3 bit raised.
            client.send_frame_of_arbitrary_bytes('\x91\x80', '')
            client.assert_receive_close(
                client_for_testing.STATUS_UNSUPPORTED)

        self._run_hybi_test(test_function)

    def test_close_on_invalid_frame(self):
        """Tests that the server sends a close frame with invalid frame payload
        data status code when the client sends an invalid frame like containing
        invalid UTF-8 character.
        """

        def test_function(client):
            client.connect()

            # Text frame with invalid UTF-8 string.
            client.send_message('\x80', raw=True)
            client.assert_receive_close(
                client_for_testing.STATUS_INVALID_FRAME_PAYLOAD)

        self._run_hybi_test(test_function)

    def _run_hybi00_test(self, test_function):
        server = self._run_server()
        try:
            time.sleep(0.2)

            client = client_for_testing.create_client_hybi00(self._options)
            try:
                test_function(client)
            finally:
                client.close_socket()
        finally:
            self._kill_process(server.pid)

    def test_echo_hybi00(self):
        self._run_hybi00_test(_echo_check_procedure)

    def test_echo_server_close_hybi00(self):
        self._run_hybi00_test(_echo_check_procedure_with_goodbye)

    def _run_hixie75_test(self, test_function):
        server = self._run_server(allow_draft75=True)
        try:
            time.sleep(0.2)

            client = client_for_testing.create_client_hixie75(self._options)
            try:
                test_function(client)
            finally:
                client.close_socket()
        finally:
            self._kill_process(server.pid)

    def test_echo_hixie75(self):
        """Tests that the server can talk draft-hixie-thewebsocketprotocol-75
        protocol.
        """

        def test_function(client):
            client.connect()

            client.send_message('test')
            client.assert_receive('test')

        self._run_hixie75_test(test_function)

    def test_echo_server_close_hixie75(self):
        """Tests that the server can talk draft-hixie-thewebsocketprotocol-75
        protocol. At the end of message exchanging, the client sends a keyword
        message that requests the server to close the connection, and then
        checks if the connection is really closed.
        """

        def test_function(client):
            client.connect()

            client.send_message('test')
            client.assert_receive('test')

            client.send_message(_GOODBYE_MESSAGE)
            client.assert_receive(_GOODBYE_MESSAGE)

        self._run_hixie75_test(test_function)

    # TODO(toyoshim): Add tests to verify invalid absolute uri handling like
    # host unmatch, port unmatch and invalid port description (':' without port
    # number).

    def test_absolute_uri(self):
        """Tests absolute uri request."""

        options = self._options
        options.resource = 'ws://localhost:%d/echo' % options.server_port
        self._run_hybi_test_with_client_options(_echo_check_procedure, options)

    def test_origin_check(self):
        """Tests http fallback on origin check fail."""

        options = self._options
        options.resource = '/origin_check'
        # Server shows warning message for http 403 fallback. This warning
        # message is confusing. Following pipe disposes warning messages.
        self.server_stderr = subprocess.PIPE
        self._run_hybi_http_fallback_test(options, 403)

    def test_version_check(self):
        """Tests http fallback on version check fail."""

        options = self._options
        options.version = 99
        self.server_stderr = subprocess.PIPE
        self._run_hybi_http_fallback_test(options, 400)

    def test_example_echo_client(self):
        """Tests that the echo_client.py example can talk with the server."""

        server = self._run_server()
        try:
            time.sleep(0.2)

            client_command = os.path.join(
                self.top_dir, 'example', 'echo_client.py')
            args = [client_command,
                    '-p', str(self._options.server_port)]
            client = self._run_python_command(args, stdout=subprocess.PIPE)
            stdoutdata, stderrdata = client.communicate()
            actual = stdoutdata.decode("utf-8")
            expected = ('Send: Hello\n' 'Recv: Hello\n'
                u'Send: \u65e5\u672c\n' u'Recv: \u65e5\u672c\n'
                'Send close\n' 'Recv ack\n')
            if actual != expected:
                raise Exception('Unexpected result on example echo client: '
                                '%r (expected) vs %r (actual)' %
                                (expected, actual))
            if stderrdata is not None:
                raise Exception('Unexpected error message on example echo '
                                'client: %r' % stderrdata)
        finally:
            self._kill_process(server.pid)


if __name__ == '__main__':
    unittest.main()


# vi:sts=4 sw=4 et
